package com.changgou.canal;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.changgou.util.CanalUtil;
import com.xpand.starter.canal.annotation.*;
import org.junit.After;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Map;

/**
 * 实现广告表的数据监听
 * @author 戴金华
 * @date 2019-12-06 16:56
 */
@CanalEventListener
public class AdListener {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 添加广告数据的监听   只有增加后的数据
     * @param  eventType 当前操作类型
     * @param  rowData 发生变更的一行数据
     */
    @InsertListenPoint(schema = "changgou_business",table = "tb_ad")
    public void onEventInsert(CanalEntry.EventType eventType,CanalEntry.RowData rowData){
        Map<String, String> afterMap = CanalUtil.convertToMap(rowData.getAfterColumnsList());
        String position = afterMap.get("position");
        //将消息发送给rabbitMq
        rabbitTemplate.convertAndSend("","ad_update_queue",position);
    }

    /**
     * 删除广告的数据监听
     * @param eventType
     * @param rowData
     */
    @DeleteListenPoint(schema = "changgou_business",table = "tb_ad")
    public void onEventDelete(CanalEntry.EventType eventType,CanalEntry.RowData rowData){
        Map<String, String> beforeMap = CanalUtil.convertToMap(rowData.getBeforeColumnsList());
        String position = beforeMap.get("position");
        rabbitTemplate.convertAndSend("","ad_update_queue",position);
    }

    @UpdateListenPoint(schema = "changgou_business",table = "tb_ad")
    public void update(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        //获取修改后的数据
        Map<String, String> afterMap = CanalUtil.convertToMap(rowData.getAfterColumnsList());
        String position = afterMap.get("position");
        //将数据放入消息队列中
        rabbitTemplate.convertAndSend("","ad_update_queue",position);
    }
}
